package com.carol.bigdata.task.feature

import com.carol.bigdata.Config
import com.carol.bigdata.constant.KVConstant
import com.carol.bigdata.utils.HBaseUtil
import com.carol.bigdata.utils.{Flag, TimeUtil}
import com.carol.bigdata.Config.hbaseParams
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SparkSession}
import com.carol.bigdata.utils.FeatureUtil.{calFeat, calTotal, changeProfileRDD, getDataRowKey}
import com.carol.bigdata.utils.RddReader



object CalFavorTag {

    // 基本字段
    val keyColumns: List[String] = KVConstant.keyColumns
    val keyLen: Int = keyColumns.length
    val cf: String = KVConstant.cf
    val timeField: String = KVConstant.timeField
    val propertiesField: String = KVConstant.propertiesField

    // 数据源
    // 登出表
    val logoutGameTable: String = KVConstant.actionTable
    val logoutGameColumns: List[String] = keyColumns :+ "hour"
    val logoutGamePattern: String = KVConstant.loginPattern //KVConstant.logoutPattern
    val logoutPropColumns: List[String] = List("online_time")

    // 游戏日更中间表
    // 用户数据统计结果表
    val userProfileTable: String = KVConstant.userProfileTable
    val keyCF: String = KVConstant.keyCF
    val keyCols: List[String] = KVConstant.dateKeyColumns
    val writeKeyCols: List[String] = KVConstant.writeKeyCols
    val event: String = "profile"

    // 用户偏好特征字段 favor_tag
    val favorMiddleCF = "favor_tag"
    val favorFeat: List[(String, String)] = List(
        ("$game", "map"),
        ("$login_time", "map")
        //("$online_time", "int")
    )

    def calFavorInfo(statDay: String,
                     logoutUsers: RDD[List[String]],
                     featList: List[(String, String)]): RDD[(List[String], List[String])] = {

        //println(s"logoutUsers: ${logoutUsers.count()}")
        //logoutUsers.take(5).foreach(println)
        // logoutUsers: List(gameId,accountId,loginTime,onlineTime)
        // => List(gameId,accountId), List(gameId,loginTime,onlineTime)
        val favorInfoRDD = logoutUsers
          .map(x => {
              val key = x.take(keyLen)
              val game = x.head
              val List(loginTime, onlineTime) = x.takeRight(x.length - keyLen)
              val time = loginTime.takeRight(2)
              (key, List(game, time, onlineTime).map(List(_)))
          })
          .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
          .map(x => {
              val res = calFeat(x._2, featList)
              (statDay +: x._1, res)
          })
        println(s"favorInfoRDD: ${favorInfoRDD.count()}")
        println(s"feat: ${featList}")
        favorInfoRDD.take(5).foreach(println)
        favorInfoRDD
    }

    def calFavorInfoWithLogin(statDay: String,
                              loginUserRDD: RDD[List[String]],
                              featList: List[(String, String)]): RDD[(List[String], List[String])] = {

        //println(s"loginUserRDD: ${loginUserRDD.count()}")
        //loginUserRDD.take(5).foreach(println)
        // loginUserRDD: List(gameId,accountId,loginTime) => List(gameId,accountId), List(gameId,loginTime)
        val favorInfoRDD = loginUserRDD
          .map(x => {
              val key = x.take(keyLen)
              val game = x.head
              val loginTime = x.last
              val time = loginTime.takeRight(2)
              (key, List(game, time).map(List(_)))
          })
          .reduceByKey((a, b) => a.zip(b).map(i => i._1 ++ i._2))
          .map(x => {
              val res = calFeat(x._2, featList)
              (statDay +: x._1, res)
          })
        println(s"favorInfoRDD: ${favorInfoRDD.count()}")
        println(s"feat: ${featList}")
        favorInfoRDD.take(5).foreach(println)
        favorInfoRDD
    }


    def run(hbaseParams: Map[String, String],
            spark: SparkSession,
            statDay: String,
            game: String): Unit = {
        println("计算偏好特征")
        val yesterday: String = TimeUtil.getTimeDeltaDay(statDay, -1)
        val yesterdayRowKey = for (date <- List(yesterday)) yield getDataRowKey(game, event, date)
        // 1、读取hbase数据
        val logoutUsers = RddReader.readHiveRDD(spark, logoutGameTable, statDay, logoutGamePattern, game, logoutGameColumns)
        println("logoutUsers:", logoutUsers.count())
        logoutUsers.take(5).foreach(println)
        // 2、计算
        // List(time.game_id,uid), List(gameId,loginTime,onlineTime)
        //val favorRDD = calFavorInfo(statDay, logoutUsers, favorFeat)
        val favorRDD = calFavorInfoWithLogin(statDay, logoutUsers, favorFeat)
        // List(game_id,event,time,uid), List(gameId,loginTime,onlineTime)
        val writeFavorRDD = changeProfileRDD(favorRDD)

        // 3、写入hbase中间表
        val favorFeatColumns = favorFeat.map(_._1)
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeFavorRDD, keyCF, writeKeyCols, favorFeatColumns,
            List.fill(favorFeatColumns.length)(favorMiddleCF))

        // 4、计算total
        val List(totalIntRDD, totalMapRDD) = calTotal(hbaseParams, spark, statDay, favorRDD, userProfileTable,
            List(keyCF, favorMiddleCF), keyCols, favorFeat, yesterdayRowKey)
        // List(game_id,event,time,uid), List(gameId,loginTime,onlineTime)
        val writeTotalIntRDD = changeProfileRDD(totalIntRDD)
        val writeTotalMapRDD = changeProfileRDD(totalMapRDD)

        // 5、写入total
        val totalIntColumns = favorFeat.filter(_._2.toLowerCase == "int").map(_._1 + "_total")
        val totalMapColumns = favorFeat.filter(_._2.toLowerCase == "map").map(_._1 + "_total")
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalIntRDD, keyCF, writeKeyCols, totalIntColumns,
            List.fill(totalIntColumns.length)(favorMiddleCF))
        HBaseUtil.writeRDD2KVColumn(hbaseParams, userProfileTable, writeTotalMapRDD, keyCF, writeKeyCols, totalMapColumns,
            List.fill(totalMapColumns.length)(favorMiddleCF))
    }

    def main(args: Array[String]): Unit = {
        Flag.Parse(args)
        val spark = SparkSession.builder()
          .config(conf = Config.sparkConf)
          .enableHiveSupport()
          .getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("ERROR")
        //val ipPath: String = sc.broadcast(Config.ipPath).value
        val gameList = List("200")
        val timezoneList = List("Asia/Bangkok")
        for (game <- gameList) {
            for (timezone <- timezoneList) {
                for (day <- 1 to 1) {
                    val statday = "2021-08-%02d".format(day)
                    TimeUtil.timer(run(hbaseParams, spark, statday, game))
                }
            }
        }

        spark.stop()
    }
}
